package fun.easycode.datastream;

import cn.hutool.core.collection.CollectionUtil;
import com.fasterxml.jackson.core.type.TypeReference;
import fun.easycode.jointblock.core.CheckException;
import fun.easycode.jointblock.core.JointBlockMapper;
import fun.easycode.jointblock.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;

import java.util.List;
import java.util.stream.Collectors;

/**
 * @author xuzhen97
 */
@Slf4j
public class DataTransformerEntryConsumer<T,R> {
    private final IDataTransformer<T,R> transform;
    private final DefaultMQPushConsumer consumer;
    private final DataStreamProperties properties;

    public DataTransformerEntryConsumer(IDataTransformer<T,R> transform, DataStreamProperties properties) throws MQClientException {
        this.transform = transform;
        this.properties = properties;
        consumer = createConsumer();
    }

    public void start() throws MQClientException {
        consumer.start();
    }

    private DefaultMQPushConsumer createConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(transform.getMark());
        consumer.setNamesrvAddr(properties.getRocketMq().getNameSrvAddr());
        consumer.subscribe(properties.getRocketMq().getDefaultTransformTopic(), transform.getMark());

        consumer.registerMessageListener((MessageListenerOrderly) (msgList, consumeOrderlyContext) -> {
            if (CollectionUtil.isEmpty(msgList)) {
                return ConsumeOrderlyStatus.SUCCESS;
            }
            // 将消息转换为QuickEntry对象
            List<DataEntry<T>> data = msgList.stream()
                    .map(msg-> JacksonUtil.readValue(new String(msg.getBody()), new TypeReference<DataEntry<T>>() {
                    }))
                    .collect(Collectors.toList());

            // 获取数据处理器
            IDataProcessor processor = DataContext.getProcessor(transform.getParam().getProcessor());
            // 获取mapper
            JointBlockMapper jointBlockMapper = DataContext.getMapper(processor.getParam().getInputMapper());

            // 对于转换的数据，一条一条的处理
            for(DataEntry<T> entry : data){
                String transformWhereSql = entry.getExtra().get("transformWhereSql");
                if(entry.getDataMode() == DataMode.COPE){
                    try {
                        // 流式处理方式
                        DataResultHandler handler =
                                new DataResultHandler<>((inputData)-> DataContext.tryLock(processor, ()-> processor.process(inputData)));
                        // 调用流式查询方法
                        jointBlockMapper.streamQuerySql(transformWhereSql, handler);
                        // 最后一次的缓存需要手动处理
                        DataContext.tryLock(processor, ()-> processor.process(handler.getCache()));
                    }catch (Exception e){
                        log.error("数据处理异常", e);
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }else if (entry.getDataMode() == DataMode.CLEAN_UP){
                    try {
                        // 流式处理方式
                        DataResultHandler handler =
                                new DataResultHandler<>((cleanData)-> DataContext.tryLock(processor, ()-> processor.cleanUp(cleanData)));
                        // 调用流式查询方法
                        jointBlockMapper.streamQuerySql(transformWhereSql, handler);
                        // 最后一次的缓存需要手动处理
                        DataContext.tryLock(processor, ()-> processor.cleanUp(handler.getCache()));
                    }catch (Exception e){
                        log.error("数据处理异常", e);
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }else{
                    throw new CheckException("不支持的处理模式");
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        });
        return consumer;
    }
}
